AWS IoT Analytics を CDK で構築してみた
はじめに
テントの中から失礼します、CX事業本部のてんとタカハシです!
前回の記事の続編で、AWS IoT Analytics を CDK で構築してみました。作るものだったりのお話は下記の記事に書いていますので、そちらをご参照ください。
今回のソースコードは下記に置いています。
GitHub - iam326/aws-iot-analytics-sample-cdk
環境
$ sw_vers ProductName: Mac OS X ProductVersion: 10.15.7 BuildVersion: 19H2 $ aws --version aws-cli/2.0.28 Python/3.7.4 Darwin/19.6.0 botocore/2.0.0dev32 $ cdk --version 1.70.0 (build c145314)
スタックの実装
今回は下記サービスごとにスタックを分けています。
- Lambda
- AWS IoT Analytics
- AWS IoT Core
Lambda
AWS IoT Analytics の Pipeline で Lambda と連携するために Function を作成します。AWS IoT Analytics から Invoke できるようにパーミッションの設定も行います。
import * as cdk from '@aws-cdk/core'; import * as iam from '@aws-cdk/aws-iam'; import * as lambda from '@aws-cdk/aws-lambda'; interface LambdaStackProps extends cdk.StackProps { projectName: string; } export class LambdaStack extends cdk.Stack { public readonly function: lambda.Function; constructor(scope: cdk.Construct, id: string, props: LambdaStackProps) { super(scope, id, props); const { projectName } = props; const lambdaExecutionRole = new iam.Role(this, 'lambdaExecutionRole', { assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'), path: '/', managedPolicies: [ iam.ManagedPolicy.fromAwsManagedPolicyName( 'service-role/AWSLambdaBasicExecutionRole' ), ], }); const lambdaFunction = new lambda.Function(this, 'lambdaFunction', { functionName: `${projectName}_pipeline_lambda_function`, handler: 'index.lambda_handler', role: lambdaExecutionRole, runtime: lambda.Runtime.PYTHON_3_7, code: lambda.Code.fromAsset('src'), }); lambdaFunction.addPermission('LambdaFunctionPermission', { principal: new iam.ServicePrincipal('iotanalytics.amazonaws.com'), action: 'lambda:InvokeFunction', }); this.function = lambdaFunction; } }
Lambda の実装は別ファイルに置いています。Pub されたデータの属性を、別属性にコピーするだけの処理です。
import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) for e in event: if 'temperature' in e: e['temperature_copy'] = e['temperature'] logger.info("event after processing: {}".format(event)) return event
AWS IoT Analytics
前回の記事と同じで様々なアクティビティを試しています。データセットを5分ごとに更新するようにして、当日のデータのみ取得するようにしています。
import * as cdk from '@aws-cdk/core'; import * as iotAnalytics from '@aws-cdk/aws-iotanalytics'; interface IotAnalyticsStackProps extends cdk.StackProps { projectName: string; pipelineLambdaActivityFunctionName: string; } export class IotAnalyticsStack extends cdk.Stack { public readonly channel: iotAnalytics.CfnChannel; constructor(scope: cdk.Construct, id: string, props: IotAnalyticsStackProps) { super(scope, id, props); const { projectName, pipelineLambdaActivityFunctionName } = props; const channelName = `${projectName}_iot_analytics_channel`; const iotAnalyticsChannel = new iotAnalytics.CfnChannel( this, 'IotAnalyticsChannel', { channelName, channelStorage: { serviceManagedS3: {}, }, } ); this.channel = iotAnalyticsChannel; const datastoreName = `${projectName}_iot_analytics_datastore`; const iotAnalyticsDatastore = new iotAnalytics.CfnDatastore( this, 'IotAnalyticsDatastore', { datastoreName, datastoreStorage: { serviceManagedS3: {}, }, } ); const iotAnalyticsPipeline = new iotAnalytics.CfnPipeline( this, 'IotAnalyticsPipeline', { pipelineName: `${projectName}_iot_analytics_pipeline`, pipelineActivities: [ { channel: { name: 'pipeline_channel_activity', channelName, next: 'pipeline_add_attributes_activity', }, addAttributes: { name: 'pipeline_add_attributes_activity', attributes: { 'device.id': 'id', 'device.name': 'name', }, next: 'pipeline_remove_attributes_activity', }, removeAttributes: { name: 'pipeline_remove_attributes_activity', attributes: ['device'], next: 'pipeline_filter_activity', }, filter: { name: 'pipeline_filter_activity', filter: 'temperature >= 10 AND temperature <= 40', next: 'pipeline_math_activity', }, math: { name: 'pipeline_math_activity', attribute: 'temperature_f', math: 'temperature * 1.8 + 32', next: 'pipeline_lambda_activity', }, lambda: { name: 'pipeline_lambda_activity', batchSize: 1, lambdaName: pipelineLambdaActivityFunctionName, next: 'pipeline_datastore_activity', }, datastore: { name: 'pipeline_datastore_activity', datastoreName, }, }, ], } ); const iotAnalyticsDataset = new iotAnalytics.CfnDataset( this, 'IotAnalyticsDataset', { datasetName: `${projectName}_iot_analytics_dataset`, actions: [ { actionName: 'SqlAction', queryAction: { sqlQuery: `SELECT * FROM ${datastoreName} WHERE __dt > current_date - interval '1' day`, }, }, ], retentionPeriod: { numberOfDays: 1, unlimited: false, }, triggers: [ { schedule: { scheduleExpression: 'rate(5 minute)', }, }, ], } ); iotAnalyticsDataset.addDependsOn(iotAnalyticsDatastore); } }
AWS IoT Core
トピックiot/topic
に Pub されたデータを AWS IoT Analytics に受け渡すようにしています。
import * as cdk from '@aws-cdk/core'; import * as iot from '@aws-cdk/aws-iot'; import * as iam from '@aws-cdk/aws-iam'; interface IotCoreStackProps extends cdk.StackProps { projectName: string; ioTCertificateName: string; iotAnalyticsChannelName: string; } export class IotCoreStack extends cdk.Stack { constructor(scope: cdk.Construct, id: string, props: IotCoreStackProps) { super(scope, id, props); const { accountId, region } = new cdk.ScopedAws(this); const { projectName, ioTCertificateName, iotAnalyticsChannelName } = props; const ioTCertificateArn = `arn:aws:iot:${region}:${accountId}:cert/${ioTCertificateName}`; const policyName = `${projectName}_iot_policy`; const iotPolicy = new iot.CfnPolicy(this, 'IotPolicy', { policyDocument: { Version: '2012-10-17', Statement: [ { Effect: 'Allow', Action: 'iot:*', Resource: '*', }, ], }, policyName, }); const thingName = `${projectName}_iot_thing`; const iotThing = new iot.CfnThing(this, 'IotThing', { thingName }); const iotPolicyPrincipalAttachment = new iot.CfnPolicyPrincipalAttachment( this, 'IotPolicyPrincipalAttachment', { policyName, principal: ioTCertificateArn, } ); iotPolicyPrincipalAttachment.addDependsOn(iotPolicy); const iotThingPrincipalAttachment = new iot.CfnThingPrincipalAttachment( this, 'IotThingPrincipalAttachment', { thingName, principal: ioTCertificateArn, } ); iotThingPrincipalAttachment.addDependsOn(iotThing); const iotBatchPutMessageRole = new iam.Role( this, 'IotBatchPutMessageRole', { assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'), path: '/', } ); iotBatchPutMessageRole.addToPolicy( new iam.PolicyStatement({ actions: ['iotanalytics:BatchPutMessage'], resources: [ `arn:aws:iotanalytics:${region}:${accountId}:channel/${iotAnalyticsChannelName}`, ], }) ); const IoTTopicRule = new iot.CfnTopicRule(this, 'IotTopicRule', { ruleName: `${projectName}_iot_topic_rule`, topicRulePayload: { actions: [ { iotAnalytics: { channelName: iotAnalyticsChannelName, roleArn: iotBatchPutMessageRole.roleArn, }, }, ], awsIotSqlVersion: '2016-03-23', ruleDisabled: false, sql: "SELECT * FROM 'iot/topic'", }, }); } }
スタックの依存関係
Lambda の Function 名や、AWS IoT Analytics の Channel 名を依存するスタックに渡してあげます。
#!/usr/bin/env node import 'source-map-support/register'; import * as cdk from '@aws-cdk/core'; import { LambdaStack } from '../lib/lambda-stack'; import { IotAnalyticsStack } from '../lib/iot-analytics-stack'; import { IotCoreStack } from '../lib/iot-core-stack'; export type Environment = { projectName: string; ioTCertificateName: string; }; const app = new cdk.App(); const projectName: string = app.node.tryGetContext('projectName'); const ioTCertificateName: string = app.node.tryGetContext('ioTCertificateName'); const env: Environment = { projectName, ioTCertificateName, }; const lambda = new LambdaStack(app, 'LambdaStack', env); const iotAnalytics = new IotAnalyticsStack(app, 'IotAnalyticsStack', { ...env, pipelineLambdaActivityFunctionName: lambda.function.functionName, }); const iotCoreStack = new IotCoreStack(app, 'IotCoreStack', { ...env, iotAnalyticsChannelName: iotAnalytics.channel.channelName as string, }); iotCoreStack.addDependency(iotAnalytics);
デプロイ
下記でデプロイします。
$ yarn install $ yarn build $ cdk bootstrap $ export AWS_IOT_CERTIFICATE_NAME="<証明書の名前>" $ cdk deploy --context ioTCertificateName=${AWS_IOT_CERTIFICATE_NAME} IotCoreStack
AWS_IOT_CERTIFICATE_NAME
は AWS IoT Core で作った証明書の名前です。
データを Pub したり 可視化 したり
前回の記事で、データを Pub して、可視化するまでの流れを記載しているので、そちらをご参照ください。
おわりに
前回、CloudFormation で頑張った分、今回はあまり時間をかけずに構築することができました。
AWS IoT Core、AWS IoT Analytics 両方とも high-level construct に対応していなかったので、CloudFormation と同じ構造で設定していく必要があり、コード量としてはあまり変わりませんでした。CDK の今後のアップデートに期待したいですね。
今回は以上になります。最後まで読んで頂きありがとうございました!